{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "填充nans、infs和naive_outlier"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pandas as pd\n",
    "import numpy as np\n",
    "import joblib\n",
    "from tsfresh.utilities.dataframe_functions import impute\n",
    "import os\n",
    "from joblib import Parallel, delayed"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# def load_sensor(data_no, idx):\n",
    "#     sensor = joblib.load('./sensors_id_sort/0%d/%d.lz4'%(data_no, idx))\n",
    "#     return sensor\n",
    "\n",
    "def gen_boxplot_bound(ser):\n",
    "    '''\n",
    "    描述：\n",
    "        输出箱线图的异常值边界,输出上下边界以及fillna()之后的结果\n",
    "    '''\n",
    "    ser = ser.fillna(method='pad')\n",
    "    tmp = ser.values.ravel()\n",
    "    q1 = np.percentile(tmp, 0.1)\n",
    "    q3 = np.percentile(tmp, 99.9)\n",
    "    iqr = q3 -q1\n",
    "    up_bound = q3 + 10*iqr\n",
    "    down_bound = q1 - 10*iqr\n",
    "    return up_bound, down_bound, ser\n",
    "    \n",
    "def naive_outliers_impute(df):\n",
    "    '''\n",
    "    描述：\n",
    "        naive_outliers->up_bound, down_bound\n",
    "    '''\n",
    "\n",
    "    for col in df.columns:\n",
    "        up_bound, down_bound, ser = gen_boxplot_bound(df[col])\n",
    "        \n",
    "        df.loc[ser<down_bound, col] = down_bound\n",
    "        df.loc[ser>up_bound, col] = up_bound\n",
    "    return df"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "def clean_sensor(df):\n",
    "    '''\n",
    "    描述：\n",
    "        填充nans->median, infs->minmax, naive_outliers->up_bound, down_bound\n",
    "    '''\n",
    "    df_1 = impute(df)\n",
    "    df_2 = naive_outliers_impute(df_1)\n",
    "    return df_2\n",
    "\n",
    "def clean_sensor_parallel(train_no, csv_nos, opt_func):\n",
    "    '''\n",
    "    描述：\n",
    "        并行清洗数据\n",
    "    参数：\n",
    "        train_no：第几个plc\n",
    "        csv_nos：plc对应的sensor文件个数\n",
    "    '''\n",
    "    \n",
    "    input_dir = './sensors_id_sort/0%d/'%train_no\n",
    "    output_dir = './sensors_clean/0%d/'%train_no\n",
    "\n",
    "    if not os.path.exists('./sensors_clean/'):\n",
    "        os.mkdir('./sensors_clean')\n",
    "    if not os.path.exists('./sensors_clean/0%d'%train_no):\n",
    "        os.mkdir('./sensors_clean/0%d'%train_no)\n",
    "    \n",
    "    def basis_func(idx):\n",
    "        sensor = joblib.load(input_dir + '%d.lz4'%idx)\n",
    "        tmp = opt_func(sensor)\n",
    "        joblib.dump(tmp, output_dir+'%d.lz4'%idx, compress='lz4')\n",
    "\n",
    "    Parallel(n_jobs=48,verbose=10)(delayed(basis_func)(i) for i in range(1,csv_nos+1))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Parallel(n_jobs=48)]: Using backend LokyBackend with 48 concurrent workers.\n",
      "[Parallel(n_jobs=48)]: Done   3 out of  48 | elapsed:   53.1s remaining: 13.3min\n",
      "[Parallel(n_jobs=48)]: Done   8 out of  48 | elapsed:  1.3min remaining:  6.5min\n",
      "[Parallel(n_jobs=48)]: Done  13 out of  48 | elapsed:  1.8min remaining:  4.9min\n",
      "[Parallel(n_jobs=48)]: Done  18 out of  48 | elapsed:  1.9min remaining:  3.2min\n",
      "[Parallel(n_jobs=48)]: Done  23 out of  48 | elapsed:  2.0min remaining:  2.1min\n",
      "[Parallel(n_jobs=48)]: Done  28 out of  48 | elapsed:  2.0min remaining:  1.4min\n",
      "[Parallel(n_jobs=48)]: Done  33 out of  48 | elapsed:  2.0min remaining:   54.5s\n",
      "[Parallel(n_jobs=48)]: Done  38 out of  48 | elapsed:  2.0min remaining:   31.6s\n",
      "[Parallel(n_jobs=48)]: Done  43 out of  48 | elapsed:  2.0min remaining:   14.1s\n",
      "[Parallel(n_jobs=48)]: Done  48 out of  48 | elapsed:  2.0min remaining:    0.0s\n",
      "[Parallel(n_jobs=48)]: Done  48 out of  48 | elapsed:  2.0min finished\n"
     ]
    }
   ],
   "source": [
    "clean_sensor_parallel(1, 48, clean_sensor)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Parallel(n_jobs=48)]: Using backend LokyBackend with 48 concurrent workers.\n",
      "[Parallel(n_jobs=48)]: Done   3 out of  48 | elapsed:  1.3min remaining: 19.8min\n",
      "[Parallel(n_jobs=48)]: Done   8 out of  48 | elapsed:  1.9min remaining:  9.5min\n",
      "[Parallel(n_jobs=48)]: Done  13 out of  48 | elapsed:  2.0min remaining:  5.4min\n",
      "[Parallel(n_jobs=48)]: Done  18 out of  48 | elapsed:  2.1min remaining:  3.4min\n",
      "[Parallel(n_jobs=48)]: Done  23 out of  48 | elapsed:  2.1min remaining:  2.3min\n",
      "[Parallel(n_jobs=48)]: Done  28 out of  48 | elapsed:  2.1min remaining:  1.5min\n",
      "[Parallel(n_jobs=48)]: Done  33 out of  48 | elapsed:  2.1min remaining:   57.0s\n",
      "[Parallel(n_jobs=48)]: Done  38 out of  48 | elapsed:  2.1min remaining:   33.2s\n",
      "[Parallel(n_jobs=48)]: Done  43 out of  48 | elapsed:  2.1min remaining:   14.8s\n",
      "[Parallel(n_jobs=48)]: Done  48 out of  48 | elapsed:  2.1min remaining:    0.0s\n",
      "[Parallel(n_jobs=48)]: Done  48 out of  48 | elapsed:  2.1min finished\n"
     ]
    }
   ],
   "source": [
    "clean_sensor_parallel(2, 48, clean_sensor)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "[Parallel(n_jobs=48)]: Using backend LokyBackend with 48 concurrent workers.\n",
      "[Parallel(n_jobs=48)]: Done   2 out of  37 | elapsed:   43.5s remaining: 12.7min\n",
      "[Parallel(n_jobs=48)]: Done   6 out of  37 | elapsed:  1.0min remaining:  5.4min\n",
      "[Parallel(n_jobs=48)]: Done  10 out of  37 | elapsed:  1.4min remaining:  3.7min\n",
      "[Parallel(n_jobs=48)]: Done  14 out of  37 | elapsed:  1.5min remaining:  2.5min\n",
      "[Parallel(n_jobs=48)]: Done  18 out of  37 | elapsed:  1.5min remaining:  1.6min\n",
      "[Parallel(n_jobs=48)]: Done  22 out of  37 | elapsed:  1.5min remaining:  1.1min\n",
      "[Parallel(n_jobs=48)]: Done  26 out of  37 | elapsed:  1.6min remaining:   39.5s\n",
      "[Parallel(n_jobs=48)]: Done  30 out of  37 | elapsed:  1.6min remaining:   21.8s\n",
      "[Parallel(n_jobs=48)]: Done  34 out of  37 | elapsed:  1.6min remaining:    8.3s\n",
      "[Parallel(n_jobs=48)]: Done  37 out of  37 | elapsed:  1.6min finished\n"
     ]
    }
   ],
   "source": [
    "clean_sensor_parallel(3, 37, clean_sensor)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
